diff --git a/django/db/models/fields/related_descriptors.py b/django/db/models/fields/related_descriptors.py
index c5d87f647a..11d92504be 100644
--- a/django/db/models/fields/related_descriptors.py
+++ b/django/db/models/fields/related_descriptors.py
@@ -1183,242 +1183,18 @@ def create_forward_many_to_many_manager(superclass, rel, reverse):
 
         set.alters_data = True
 
-        def create(self, *, through_defaults=None, **kwargs):
-            db = router.db_for_write(self.instance.__class__, instance=self.instance)
-            new_obj = super(ManyRelatedManager, self.db_manager(db)).create(**kwargs)
-            self.add(new_obj, through_defaults=through_defaults)
-            return new_obj
+        async def aget_or_create(self, **kwargs):
+            # Ensure we call the get_or_create method of the related manager, not the QuerySet.
+            manager = self._get_manager_for_async_methods()
+            return await sync_to_async(manager.get_or_create)(**kwargs)
 
-        create.alters_data = True
-
-        def get_or_create(self, *, through_defaults=None, **kwargs):
-            db = router.db_for_write(self.instance.__class__, instance=self.instance)
-            obj, created = super(ManyRelatedManager, self.db_manager(db)).get_or_create(
-                **kwargs
-            )
-            # We only need to add() if created because if we got an object back
-            # from get() then the relationship already exists.
-            if created:
-                self.add(obj, through_defaults=through_defaults)
-            return obj, created
-
-        get_or_create.alters_data = True
-
-        def update_or_create(self, *, through_defaults=None, **kwargs):
-            db = router.db_for_write(self.instance.__class__, instance=self.instance)
-            obj, created = super(
-                ManyRelatedManager, self.db_manager(db)
-            ).update_or_create(**kwargs)
-            # We only need to add() if created because if we got an object back
-            # from get() then the relationship already exists.
-            if created:
-                self.add(obj, through_defaults=through_defaults)
-            return obj, created
-
-        update_or_create.alters_data = True
-
-        def _get_target_ids(self, target_field_name, objs):
-            """
-            Return the set of ids of `objs` that the target field references.
-            """
-            from django.db.models import Model
-
-            target_ids = set()
-            target_field = self.through._meta.get_field(target_field_name)
-            for obj in objs:
-                if isinstance(obj, self.model):
-                    if not router.allow_relation(obj, self.instance):
-                        raise ValueError(
-                            'Cannot add "%r": instance is on database "%s", '
-                            'value is on database "%s"'
-                            % (obj, self.instance._state.db, obj._state.db)
-                        )
-                    target_id = target_field.get_foreign_related_value(obj)[0]
-                    if target_id is None:
-                        raise ValueError(
-                            'Cannot add "%r": the value for field "%s" is None'
-                            % (obj, target_field_name)
-                        )
-                    target_ids.add(target_id)
-                elif isinstance(obj, Model):
-                    raise TypeError(
-                        "'%s' instance expected, got %r"
-                        % (self.model._meta.object_name, obj)
-                    )
-                else:
-                    target_ids.add(target_field.get_prep_value(obj))
-            return target_ids
-
-        def _get_missing_target_ids(
-            self, source_field_name, target_field_name, db, target_ids
-        ):
-            """
-            Return the subset of ids of `objs` that aren't already assigned to
-            this relationship.
-            """
-            vals = (
-                self.through._default_manager.using(db)
-                .values_list(target_field_name, flat=True)
-                .filter(
-                    **{
-                        source_field_name: self.related_val[0],
-                        "%s__in" % target_field_name: target_ids,
-                    }
-                )
-            )
-            return target_ids.difference(vals)
-
-        def _get_add_plan(self, db, source_field_name):
-            """
-            Return a boolean triple of the way the add should be performed.
-
-            The first element is whether or not bulk_create(ignore_conflicts)
-            can be used, the second whether or not signals must be sent, and
-            the third element is whether or not the immediate bulk insertion
-            with conflicts ignored can be performed.
-            """
-            # Conflicts can be ignored when the intermediary model is
-            # auto-created as the only possible collision is on the
-            # (source_id, target_id) tuple. The same assertion doesn't hold for
-            # user-defined intermediary models as they could have other fields
-            # causing conflicts which must be surfaced.
-            can_ignore_conflicts = (
-                self.through._meta.auto_created is not False
-                and connections[db].features.supports_ignore_conflicts
-            )
-            # Don't send the signal when inserting duplicate data row
-            # for symmetrical reverse entries.
-            must_send_signals = (
-                self.reverse or source_field_name == self.source_field_name
-            ) and (signals.m2m_changed.has_listeners(self.through))
-            # Fast addition through bulk insertion can only be performed
-            # if no m2m_changed listeners are connected for self.through
-            # as they require the added set of ids to be provided via
-            # pk_set.
-            return (
-                can_ignore_conflicts,
-                must_send_signals,
-                (can_ignore_conflicts and not must_send_signals),
-            )
+        aget_or_create.alters_data = True
 
-        def _add_items(
-            self, source_field_name, target_field_name, *objs, through_defaults=None
-        ):
-            # source_field_name: the PK fieldname in join table for the source object
-            # target_field_name: the PK fieldname in join table for the target object
-            # *objs - objects to add. Either object instances, or primary keys
-            # of object instances.
-            if not objs:
-                return
-
-            through_defaults = dict(resolve_callables(through_defaults or {}))
-            target_ids = self._get_target_ids(target_field_name, objs)
-            db = router.db_for_write(self.through, instance=self.instance)
-            can_ignore_conflicts, must_send_signals, can_fast_add = self._get_add_plan(
-                db, source_field_name
-            )
-            if can_fast_add:
-                self.through._default_manager.using(db).bulk_create(
-                    [
-                        self.through(
-                            **{
-                                "%s_id" % source_field_name: self.related_val[0],
-                                "%s_id" % target_field_name: target_id,
-                            }
-                        )
-                        for target_id in target_ids
-                    ],
-                    ignore_conflicts=True,
-                )
-                return
-
-            missing_target_ids = self._get_missing_target_ids(
-                source_field_name, target_field_name, db, target_ids
-            )
-            with transaction.atomic(using=db, savepoint=False):
-                if must_send_signals:
-                    signals.m2m_changed.send(
-                        sender=self.through,
-                        action="pre_add",
-                        instance=self.instance,
-                        reverse=self.reverse,
-                        model=self.model,
-                        pk_set=missing_target_ids,
-                        using=db,
-                    )
-                # Add the ones that aren't there already.
-                self.through._default_manager.using(db).bulk_create(
-                    [
-                        self.through(
-                            **through_defaults,
-                            **{
-                                "%s_id" % source_field_name: self.related_val[0],
-                                "%s_id" % target_field_name: target_id,
-                            },
-                        )
-                        for target_id in missing_target_ids
-                    ],
-                    ignore_conflicts=can_ignore_conflicts,
-                )
+        async def aupdate_or_create(self, **kwargs):
+            # Ensure we call the update_or_create method of the related manager, not the QuerySet.
+            manager = self._get_manager_for_async_methods()
+            return await sync_to_async(manager.update_or_create)(**kwargs)
 
-                if must_send_signals:
-                    signals.m2m_changed.send(
-                        sender=self.through,
-                        action="post_add",
-                        instance=self.instance,
-                        reverse=self.reverse,
-                        model=self.model,
-                        pk_set=missing_target_ids,
-                        using=db,
-                    )
-
-        def _remove_items(self, source_field_name, target_field_name, *objs):
-            # source_field_name: the PK colname in join table for the source object
-            # target_field_name: the PK colname in join table for the target object
-            # *objs - objects to remove. Either object instances, or primary
-            # keys of object instances.
-            if not objs:
-                return
-
-            # Check that all the objects are of the right type
-            old_ids = set()
-            for obj in objs:
-                if isinstance(obj, self.model):
-                    fk_val = self.target_field.get_foreign_related_value(obj)[0]
-                    old_ids.add(fk_val)
-                else:
-                    old_ids.add(obj)
-
-            db = router.db_for_write(self.through, instance=self.instance)
-            with transaction.atomic(using=db, savepoint=False):
-                # Send a signal to the other end if need be.
-                signals.m2m_changed.send(
-                    sender=self.through,
-                    action="pre_remove",
-                    instance=self.instance,
-                    reverse=self.reverse,
-                    model=self.model,
-                    pk_set=old_ids,
-                    using=db,
-                )
-                target_model_qs = super().get_queryset()
-                if target_model_qs._has_filters():
-                    old_vals = target_model_qs.using(db).filter(
-                        **{"%s__in" % self.target_field.target_field.attname: old_ids}
-                    )
-                else:
-                    old_vals = old_ids
-                filters = self._build_remove_filters(old_vals)
-                self.through._default_manager.using(db).filter(filters).delete()
-
-                signals.m2m_changed.send(
-                    sender=self.through,
-                    action="post_remove",
-                    instance=self.instance,
-                    reverse=self.reverse,
-                    model=self.model,
-                    pk_set=old_ids,
-                    using=db,
-                )
+        aupdate_or_create.alters_data = True
 
     return ManyRelatedManager
